package com.zhang.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @title:
 * @author: zhang
 * @date: 2022/3/3 14:20
 */
public class FlinkCDCSQL {
    public static void main(String[] args) throws Exception {
        //todo 1.获取流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //todo 2.获取表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //todo 3.创建动态表
        tableEnv.executeSql("CREATE TABLE user_info (" +
                " id INT," +
                " name STRING" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = 'hadoop103'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = '000000'," +
                " 'database-name' = 'gmall_realtime_2022'," +
                " 'table-name' = 'user'" +
                ")");
        //查询操作
        tableEnv.executeSql("select * from user_info").print();
        //todo 4.执行任务
        env.execute();
    }
}
